-
Notifications
You must be signed in to change notification settings - Fork 0
Automated Test: kafka-consumer-parallel-after #328
Automated Test: kafka-consumer-parallel-after #328
Conversation
One potential problem we have with batch processing is that any one slow item will clog up the whole batch. This pr implements a queueing method instead, where we keep N queues that each have their own workers. There's still a chance of individual items backlogging a queue, but we can try increased concurrency here to reduce the chances of that happening <!-- Describe your PR here. -->
📝 WalkthroughWalkthroughThis PR introduces a new "thread-queue-parallel" processing mode for Kafka message consumption, featuring a robust multi-queue processing pipeline with per-group order preservation, thread-safe offset tracking, dedicated commit threads, and comprehensive metrics integration. The implementation spans configuration updates, core queue infrastructure, consumer integration, and extensive test coverage. Changes
Sequence DiagramsequenceDiagram
participant Client as Message Consumer
participant Strategy as SimpleQueueProcessingStrategy
participant Decoder as Decoder Function
participant Pool as FixedQueuePool
participant Worker as OrderedQueueWorker
participant Processor as Result Processor
participant Tracker as OffsetTracker
participant CommitLoop as Commit Loop
participant Commit as Commit Function
Client->>Strategy: submit(message)
Strategy->>Decoder: decode(payload)
Decoder-->>Strategy: result (T)
Strategy->>Strategy: grouping_fn(result)
Strategy->>Pool: submit(group_key, WorkItem)
Pool->>Worker: enqueue WorkItem
Worker->>Worker: run() processes queue
Worker->>Processor: result_processor(subscription_id, result)
Processor-->>Worker: complete
Worker->>Tracker: complete_offset(partition, offset)
Tracker->>Tracker: mark offset complete
CommitLoop->>CommitLoop: periodic tick
CommitLoop->>Tracker: get_committable_offsets()
Tracker-->>CommitLoop: {partition: offset_map}
CommitLoop->>Commit: commit_function(offset_map)
Commit->>Commit: persist offsets
CommitLoop->>Tracker: mark_committed(partition, offset)
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@src/sentry/remote_subscriptions/consumers/queue_consumer.py`:
- Around line 127-156: The run method currently always calls
offset_tracker.complete_offset in the finally block, so exceptions from
result_processor are swallowed and offsets advance even on failure; change the
logic so complete_offset(work_item.partition, work_item.offset) is only invoked
on successful processing (i.e., after result_processor returns without
exception) and not in the finally block, keep the metrics.gauge update in
finally or adjust to report failures separately, and ensure error paths (inside
the except Exception in run) do not call complete_offset so failed messages can
be retried on restart.
- Around line 344-345: The join method on the ProcessingStrategy implementation
currently ignores its timeout argument; update queue_consumer.QueueConsumer.join
to forward the timeout to the shutdown path (e.g., call self.close(timeout) or
pass timeout into the internal join/wait calls used by close) so callers like
StreamProcessor get a bounded shutdown; ensure the close signature and any
internal join methods accept and use the timeout (propagate the timeout through
close -> internal join calls).
In `@tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py`:
- Around line 375-421: The test leaves the SimpleQueueProcessingStrategy's
background commit thread running because factory.shutdown() only stops the queue
pool; after creating the strategy via MockFactory.create_with_partitions you
must call strategy.close() to stop the strategy's commit thread before calling
factory.shutdown(); update
TestThreadQueueParallelIntegration.test_factory_creates_thread_queue_parallel_strategy
to invoke strategy.close() prior to factory.shutdown() to avoid the resource
leak.
In `@tests/sentry/uptime/consumers/test_results_consumer.py`:
- Around line 1874-1924: The test's assertion is flaky because
OrderedQueueWorker.run marks offsets in a finally block (so failed messages are
still completed), so update test_thread_queue_parallel_error_handling to expect
commits: after submitting two messages, wait for the queue_pool to drain and for
committed_offsets to contain test_partition, then assert
committed_offsets[test_partition] >= 101 (or that the partition exists and the
highest committed offset is the second message's offset). Use the existing
track_commits, factory.queue_pool and mock_processor_call loop to wait
deterministically for the commit loop before asserting; reference
OrderedQueueWorker.run behavior and
UptimeResultsStrategyFactory/create_with_partitions when locating the logic to
adjust the test expectation.
🧹 Nitpick comments (6)
src/sentry/remote_subscriptions/consumers/queue_consumer.py (4)
49-54: Minor:_get_partition_lockhas a benign TOCTOU window.Two threads could both see
Nonefrom.get()and create separateLockobjects, butsetdefaultis atomic under CPython's GIL, so only one lock wins and both callers get the same instance. This is safe in practice, but if you want to be explicit, you could use a singledefaultdict(threading.Lock)forpartition_locksor protect with a global lock.
83-93:get_committable_offsetsis O(offset_range), not O(tracked_offsets).The
range(start, max_offset + 1)loop iterates over every integer in the offset range, not just tracked offsets. If a partition has a sparse set of tracked offsets (e.g., offsets 100 and 10000), this iterates ~9900 times even though only 2 offsets are tracked. In practice this is bounded by processing throughput, but worth keeping in mind if offset values become sparse.
273-291: Commit loop waits 1 second before first commit check.The
self.shutdown_event.wait(1.0)at the top of the loop body (line 276) means the first commit check doesn't happen until ~1 second after the strategy is created, regardless of how quickly messages are processed. This is likely fine for production (matchesONCE_PER_SECONDpolicy), but it explains why some tests need to wait longer for commits to appear.
293-325: Exception duringsubmitafteradd_offsetbut beforequeue_pool.submitcould orphan offsets.If
self.decodersucceeds,self.grouping_fnsucceeds, butself.queue_pool.submitfails (line 315), the exception handler (lines 317-325) checksisinstance(message.value, BrokerValue)and adds+completes the offset. However,queue_pool.submitcallsoffset_tracker.add_offsetbeforework_queue.put(line 211-212). Ifputraises, the offset is added but never completed, and the exception handler adds it again. Theadd_offsetcall at line 211 adds the offset toall_offsetsandoutstanding, then the handler at line 320-325 adds it again (it's a set, so no duplicate), and completes it. This is actually safe because sets handle duplicates, but the flow is subtle and could benefit from a comment.src/sentry/remote_subscriptions/consumers/result_consumer.py (1)
205-212: Consider usingeliffor the mode dispatch chain.The mode flags are mutually exclusive, but the current
if/if/if/elsestructure doesn't make that obvious. Usingelifwould make the intent clearer and prevent accidental fall-through if a future change removes an earlyreturn.Proposed refactor
def create_with_partitions( self, commit: Commit, partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: if self.batched_parallel: return self.create_thread_parallel_worker(commit) - if self.parallel: + elif self.parallel: return self.create_multiprocess_worker(commit) - if self.thread_queue_parallel: + elif self.thread_queue_parallel: return self.create_thread_queue_parallel_worker(commit) else: return self.create_serial_worker(commit)tests/sentry/uptime/consumers/test_results_consumer.py (1)
1824-1872: Polling loop could time out without asserting failure.Lines 1860-1867: if the loop times out (queue never empties or no commits appear), execution falls through to the assertions at lines 1869-1870, which would fail with a less informative error. Consider adding an explicit timeout assertion like the other tests do:
assert ... , "Processing did not complete in time".Proposed improvement
queue_pool = factory.queue_pool max_wait = 20 + completed = False for _ in range(max_wait): assert queue_pool is not None stats = queue_pool.get_stats() if stats["total_items"] == 0 and len(committed_offsets) > 0: + completed = True break - time.sleep(0.1) + assert completed, "Processing or commit did not complete in time"
| def run(self) -> None: | ||
| """Process items from the queue in order.""" | ||
| while not self.shutdown: | ||
| try: | ||
| work_item = self.work_queue.get() | ||
| except queue.ShutDown: | ||
| break | ||
|
|
||
| try: | ||
| with sentry_sdk.start_transaction( | ||
| op="queue_worker.process", | ||
| name=f"monitors.{self.identifier}.worker_{self.worker_id}", | ||
| ): | ||
| self.result_processor(self.identifier, work_item.result) | ||
|
|
||
| except queue.ShutDown: | ||
| break | ||
| except Exception: | ||
| logger.exception( | ||
| "Unexpected error in queue worker", extra={"worker_id": self.worker_id} | ||
| ) | ||
| finally: | ||
| self.offset_tracker.complete_offset(work_item.partition, work_item.offset) | ||
| metrics.gauge( | ||
| "remote_subscriptions.queue_worker.queue_depth", | ||
| self.work_queue.qsize(), | ||
| tags={ | ||
| "identifier": self.identifier, | ||
| }, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors in processing are silently swallowed, and the offset still advances.
complete_offset is called in the finally block (line 149), so even when result_processor raises an exception, the offset is marked as completed and will be committed. This means failed messages are permanently skipped with only a log entry. If this is intentional (at-most-once semantics), consider documenting it explicitly. If not, the offset should only be completed on success to allow re-processing on restart.
🤖 Prompt for AI Agents
In `@src/sentry/remote_subscriptions/consumers/queue_consumer.py` around lines 127
- 156, The run method currently always calls offset_tracker.complete_offset in
the finally block, so exceptions from result_processor are swallowed and offsets
advance even on failure; change the logic so
complete_offset(work_item.partition, work_item.offset) is only invoked on
successful processing (i.e., after result_processor returns without exception)
and not in the finally block, keep the metrics.gauge update in finally or adjust
to report failures separately, and ensure error paths (inside the except
Exception in run) do not call complete_offset so failed messages can be retried
on restart.
| def join(self, timeout: float | None = None) -> None: | ||
| self.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
join() ignores the timeout parameter.
The timeout parameter from the ProcessingStrategy interface is unused. Callers (like StreamProcessor) may rely on it to bound shutdown time. Consider passing timeout through to close() or at least to the internal join calls.
Proposed fix
- def join(self, timeout: float | None = None) -> None:
- self.close()
+ def join(self, timeout: float | None = None) -> None:
+ self.shutdown_event.set()
+ self.commit_thread.join(timeout=timeout)
+ self.queue_pool.shutdown()🧰 Tools
🪛 Ruff (0.14.14)
[warning] 344-344: Unused method argument: timeout
(ARG002)
🤖 Prompt for AI Agents
In `@src/sentry/remote_subscriptions/consumers/queue_consumer.py` around lines 344
- 345, The join method on the ProcessingStrategy implementation currently
ignores its timeout argument; update queue_consumer.QueueConsumer.join to
forward the timeout to the shutdown path (e.g., call self.close(timeout) or pass
timeout into the internal join/wait calls used by close) so callers like
StreamProcessor get a bounded shutdown; ensure the close signature and any
internal join methods accept and use the timeout (propagate the timeout through
close -> internal join calls).
| class TestThreadQueueParallelIntegration(TestCase): | ||
| """Integration test with the ResultsStrategyFactory.""" | ||
|
|
||
| def test_factory_creates_thread_queue_parallel_strategy(self): | ||
| """Test that the factory properly creates thread-queue-parallel strategy.""" | ||
| from sentry.remote_subscriptions.consumers.result_consumer import ( | ||
| ResultProcessor, | ||
| ResultsStrategyFactory, | ||
| ) | ||
|
|
||
| class MockResultProcessor(ResultProcessor): | ||
| @property | ||
| def subscription_model(self): | ||
| return mock.Mock() | ||
|
|
||
| def get_subscription_id(self, result): | ||
| return result.get("subscription_id", "unknown") | ||
|
|
||
| def handle_result(self, subscription, result): | ||
| pass | ||
|
|
||
| class MockFactory(ResultsStrategyFactory): | ||
| @property | ||
| def topic_for_codec(self): | ||
| return Topic("test") | ||
|
|
||
| @property | ||
| def result_processor_cls(self): | ||
| return MockResultProcessor | ||
|
|
||
| def build_payload_grouping_key(self, result): | ||
| return result.get("subscription_id", "unknown") | ||
|
|
||
| @property | ||
| def identifier(self): | ||
| return "test" | ||
|
|
||
| factory = MockFactory(mode="thread-queue-parallel", max_workers=5) | ||
| commit = mock.Mock() | ||
| partition = Partition(Topic("test"), 0) | ||
| strategy = factory.create_with_partitions(commit, {partition: 0}) | ||
|
|
||
| assert isinstance(strategy, SimpleQueueProcessingStrategy) | ||
| assert factory.queue_pool is not None | ||
| assert factory.queue_pool.num_queues == 5 | ||
|
|
||
| factory.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource leak: strategy commit thread is never stopped.
The SimpleQueueProcessingStrategy returned at line 415 starts a daemon commit thread. factory.shutdown() at line 421 shuts down the queue pool but doesn't stop the strategy's commit thread. Call strategy.close() before factory.shutdown() to clean up properly.
Proposed fix
assert isinstance(strategy, SimpleQueueProcessingStrategy)
assert factory.queue_pool is not None
assert factory.queue_pool.num_queues == 5
+ strategy.close()
factory.shutdown()🤖 Prompt for AI Agents
In `@tests/sentry/remote_subscriptions/consumers/test_queue_consumer.py` around
lines 375 - 421, The test leaves the SimpleQueueProcessingStrategy's background
commit thread running because factory.shutdown() only stops the queue pool;
after creating the strategy via MockFactory.create_with_partitions you must call
strategy.close() to stop the strategy's commit thread before calling
factory.shutdown(); update
TestThreadQueueParallelIntegration.test_factory_creates_thread_queue_parallel_strategy
to invoke strategy.close() prior to factory.shutdown() to avoid the resource
leak.
| def test_thread_queue_parallel_error_handling(self) -> None: | ||
| """ | ||
| Test that errors in processing don't block offset commits for other messages. | ||
| """ | ||
| committed_offsets: dict[Partition, int] = {} | ||
|
|
||
| def track_commits(offsets: Mapping[Partition, int], force: bool = False) -> None: | ||
| committed_offsets.update(offsets) | ||
|
|
||
| factory = UptimeResultsStrategyFactory( | ||
| mode="thread-queue-parallel", | ||
| max_workers=2, | ||
| ) | ||
|
|
||
| test_partition = Partition(Topic("test"), 1) | ||
| consumer = factory.create_with_partitions(track_commits, {test_partition: 0}) | ||
|
|
||
| with mock.patch.object(type(factory.result_processor), "__call__") as mock_processor_call: | ||
| mock_processor_call.side_effect = [Exception("Processing failed"), None] | ||
|
|
||
| codec = kafka_definition.get_topic_codec(kafka_definition.Topic.UPTIME_RESULTS) | ||
|
|
||
| for offset, minutes in [(100, 5), (101, 4)]: | ||
| result = self.create_uptime_result( | ||
| self.subscription.subscription_id, | ||
| scheduled_check_time=datetime.now() - timedelta(minutes=minutes), | ||
| ) | ||
| message = Message( | ||
| BrokerValue( | ||
| KafkaPayload(None, codec.encode(result), []), | ||
| test_partition, | ||
| offset, | ||
| datetime.now(), | ||
| ) | ||
| ) | ||
| consumer.submit(message) | ||
|
|
||
| queue_pool = factory.queue_pool | ||
| max_wait = 20 | ||
| for _ in range(max_wait): | ||
| assert queue_pool is not None | ||
| stats = queue_pool.get_stats() | ||
| if stats["total_items"] == 0 and mock_processor_call.call_count >= 2: | ||
| time.sleep(0.2) | ||
| break | ||
| time.sleep(0.1) | ||
|
|
||
| assert mock_processor_call.call_count == 2 | ||
| assert len(committed_offsets) == 0 or test_partition not in committed_offsets | ||
|
|
||
| factory.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error handling test assertion may be incorrect or timing-dependent.
The OrderedQueueWorker.run method completes the offset in a finally block (line 149 of queue_consumer.py), meaning even on exception, the offset is marked as processed. Both offsets (100 and 101) should be completed and eventually committed by the commit loop. The assertion at line 1922 (len(committed_offsets) == 0 or test_partition not in committed_offsets) only passes if the commit loop hasn't executed yet — a timing-dependent condition that makes this test flaky.
Either:
- The
finally-based offset completion is wrong and should only happen on success (then this test is correct but the implementation has the bug I flagged in queue_consumer.py). - The implementation is intentional (at-most-once), and this assertion should expect the offsets to be committed.
🧰 Tools
🪛 Ruff (0.14.14)
[warning] 1880-1880: Unused function argument: force
(ARG001)
🤖 Prompt for AI Agents
In `@tests/sentry/uptime/consumers/test_results_consumer.py` around lines 1874 -
1924, The test's assertion is flaky because OrderedQueueWorker.run marks offsets
in a finally block (so failed messages are still completed), so update
test_thread_queue_parallel_error_handling to expect commits: after submitting
two messages, wait for the queue_pool to drain and for committed_offsets to
contain test_partition, then assert committed_offsets[test_partition] >= 101 (or
that the partition exists and the highest committed offset is the second
message's offset). Use the existing track_commits, factory.queue_pool and
mock_processor_call loop to wait deterministically for the commit loop before
asserting; reference OrderedQueueWorker.run behavior and
UptimeResultsStrategyFactory/create_with_partitions when locating the logic to
adjust the test expectation.
This pull request was automatically created by
@coderabbitai/e2e-reviewer.Batch created pull request.
Summary by CodeRabbit
Release Notes
New Features
Tests